Skip to content

Conversation

@titaneric
Copy link
Contributor

@titaneric titaneric commented Oct 15, 2025

Summary

As dicussed in #23597 and #23982, I intend to support Kubernetes logs API to tail the pods logs in this PR.

Features Implemented

  • Kubernetes API Logs Collection: Added log_collection_strategy configuration option to enable log collection via Kubernetes API instead of file-based tailing
  • Event-driven Reconciler: Implemented a reconciler that watches pod events and automatically starts/stops log tailers for running pods
  • Container Log Streaming: Added real-time log streaming from Kubernetes API with proper timestamp tracking and container identification
  • Pod Information Management: Created PodInfo struct for essential pod metadata extraction and container tracking

TODO Items

  • Batched Lines Sending: Implement batch of lines to send to the event channel
  • Position Tracking: Implement timestamp-based position management for log continuity
  • Error Handling: Add comprehensive error handling and retry mechanisms
  • Metrics Integration: Add metrics for API log collection monitoring
  • Metadata Annotation: Complete integration with pod metadata annotation pipeline
  • Performance Optimization: Implement connection pooling and efficient streaming
  • Testing Suite: Add comprehensive unit and integration tests

Vector configuration

api:
  enabled: true
  graphql: true
  playground: true
  address: "127.0.0.1:8686"
sources:
  my_source_id:
    type: kubernetes_logs
    data_dir: /tmp/vector-k8s-logs
    kube_config_file: ./kind-kubeconfig.yaml
    self_node_name: kind-worker
    api_log: true
sinks:
  my_sink_id:
    type: console
    inputs:
    - my_source_id
    encoding:
      codec: raw_message

How did you test this PR?

Given the following kind cluster config named kind-cluster.yaml

kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
- role: worker

Create kind clulster from config

kind create cluster --config kind-cluster.yaml

Export the cluster's kubeconfig to kind-kubeconfig

kind export kubeconfig --kubeconfig kind-kubeconfig

Apply the following manifest to kind's cluster

kubectl apply -f pod.yaml --kubeconfig kind-kubeconfig
apiVersion: v1
kind: Pod
metadata:
  name: multi-container-test-pod
  labels:
    app: multi-container-test
    vector.dev/exclude: "false"
spec:
  containers:
  - name: logger-a
    image: busybox:1.35
    command: [ "/bin/sh" ]
    args:
    - -c
    - |
      echo "Container A starting at $(date)"
      counter=1
      while true; do
        echo "[A-${counter}] Container A log: $(date -Iseconds) - Testing multi-container timestamp tracking"
        counter=$((counter + 1))
        sleep 3
      done
    resources:
      requests:
        memory: "32Mi"
        cpu: "50m"
      limits:
        memory: "64Mi"
        cpu: "100m"
  - name: logger-b
    image: busybox:1.35
    command: [ "/bin/sh" ]
    args:
    - -c
    - |
      echo "Container B starting at $(date)"
      counter=1
      while true; do
        echo "[B-${counter}] Container B log: $(date -Iseconds) - Different container, same pod"
        counter=$((counter + 1))
        sleep 7
      done
    resources:
      requests:
        memory: "32Mi"
        cpu: "50m"
      limits:
        memory: "64Mi"
        cpu: "100m"
  restartPolicy: Always

Build vector with nessary features and run the given config.

cargo build --no-default-features --features sources-kubernetes_logs --features sinks-console --features api
 ./target/debug/vector --config vector.yaml -v

Change Type

  • Bug fix
  • New feature
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the no-changelog label to this PR.

References

Notes

  • Please read our Vector contributor resources.
  • Do not hesitate to use @vectordotdev/vector to reach out to us regarding this PR.
  • Some CI checks run only after we manually approve them.
    • We recommend adding a pre-push hook, please see this template.
    • Alternatively, we recommend running the following locally before pushing to the remote branch:
      • make fmt
      • make check-clippy (if there are failures it's possible some of them can be fixed with make clippy-fix)
      • make test
  • After a review is requested, please avoid force pushes to help us review incrementally.
    • Feel free to push as many commits as you want. They will be squashed into one before merging.
    • For example, you can run git merge origin master and git push.
  • If this PR introduces changes Vector dependencies (modifies Cargo.lock), please
    run make build-licenses to regenerate the license inventory and commit the changes (if any). More details here.

@github-actions github-actions bot added the domain: sources Anything related to the Vector's sources label Oct 15, 2025
@titaneric titaneric marked this pull request as ready for review October 17, 2025 17:49
@titaneric titaneric requested a review from a team as a code owner October 17, 2025 17:49
@titaneric
Copy link
Contributor Author

@pront , I have introduced the new config named log_collection_strategy , and I have done lots of refactor later.
I believe code review could be time consuming. Please take your time to look at it.

Hope that the whole strucuture is good, and then we could focus on the temporary workaround to handle the Line and further event processing pipeline for api log collection strategy later.

return;
// TODO: fix it until `FunctionTransform` supports Api logs
// emit!(ParserMatchError { value: &s[..] });
drop(log.insert(&message_path, Value::Bytes(s)));
Copy link
Contributor Author

@titaneric titaneric Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is a ugly workaround. For more reasonable PR review size, I dropped the implementation of FunctionTransform trait for Api here (which is very similar to how Cri handle the logs).

@pront pront self-assigned this Oct 20, 2025
Comment on lines 289 to 302
/// The strategy to use for log collection.
log_collection_strategy: LogCollectionStrategy,
}

/// Configuration for the log collection strategy.
#[configurable_component]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
enum LogCollectionStrategy {
/// Collect logs by reading log files from the filesystem.
File,
/// Collect logs via the Kubernetes Logs API.
Api,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a default value in order not to break existing users. I think this works:

Suggested change
/// The strategy to use for log collection.
log_collection_strategy: LogCollectionStrategy,
}
/// Configuration for the log collection strategy.
#[configurable_component]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
enum LogCollectionStrategy {
/// Collect logs by reading log files from the filesystem.
File,
/// Collect logs via the Kubernetes Logs API.
Api,
}
/// The strategy to use for log collection.
#[serde(default)]
log_collection_strategy: LogCollectionStrategy,
}
/// Configuration for the log collection strategy.
#[configurable_component]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
enum LogCollectionStrategy {
/// Collect logs by reading log files from the filesystem.
#[default]
File,
/// Collect logs via the Kubernetes Logs API.
Api,
}

You also need to run make generate-component-docs in order to generate documentation for this and then we can see if it all looks ok there too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rendered document is attached below
image

let reflector_stream = BroadcastStream::new(reflector_rx).filter_map(|result| {
ready(match result {
Ok(event) => Some(Ok(event)),
Err(_) => None,
Copy link
Contributor

@thomasqueirozb thomasqueirozb Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silently dropping the error here is a bad idea. I have recently come across an issue (#24220) where a broadcast channel is erroring due to RecvError::Lagged. This will silently drop logs.

I am weary of using a broadcast channel here for the same reason as listed in that issue. IMO a remediation needs to be explored for 24220 and the same strategy needs to be applied here

log_namespace,
);

// TODO: annotate the logs with pods's metadata
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// TODO: annotate the logs with pods's metadata
// TODO: annotate the logs with pods's metadata. Need to honor the `insert_namespace_fields` flag

Minor addition that can help guide the future implementation


// Process the stream by reading line by line
loop {
match log_stream.read_until(b'\n', &mut buffer).await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks very similar to read_until_with_max_size from lib/file-source-common/src/buffer.rs. Maybe That can be used here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log_stream implements futures::AsyncBufRead , but the read_until_with_max_size expects a tokio::io::AsyncBufRead. Unless we use the async-compact crate, it seems that it's not easy to cooperate it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to change then, this was an optional suggestion. I'll take a look myself at this later

let line = Line {
text: line_bytes,
filename: String::new(), // Filename is not applicable for k8s logs
file_id: FileFingerprint::FirstLinesChecksum(0),
Copy link
Contributor

@thomasqueirozb thomasqueirozb Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not correct and will likely cause issues with the file source implementation due to referencing files with the same checksum. Maybe a new fingerprint type here is warranted

@titaneric titaneric requested a review from a team as a code owner November 17, 2025 17:48
@github-actions github-actions bot added the domain: external docs Anything related to Vector's external, public documentation label Nov 17, 2025
@pront pront removed their assignment Nov 18, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

domain: external docs Anything related to Vector's external, public documentation domain: sources Anything related to the Vector's sources

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants